package broker

import (
	"context"
	"fmt"

	"gitee.com/hexug/devcloud/maudit/apps/audit"
	"gitee.com/hexug/devcloud/maudit/common/logger"
	"github.com/segmentio/kafka-go"
)

const (
	DEFAULT_TOPIC = "audit_log"
)

type Client struct {
	brokerAddress []string
	writer        *kafka.Writer
}

func NewClient(broker []string, topic string) *Client {
	w := &kafka.Writer{
		Addr:                   kafka.TCP(broker...),
		Topic:                  topic,
		Balancer:               &kafka.LeastBytes{},
		AllowAutoTopicCreation: true,
	}
	return &Client{
		brokerAddress: broker,
		writer:        w,
	}
}

func NewDefaultClient(broker []string) *Client {
	return NewClient(broker, DEFAULT_TOPIC)
}

func (c *Client) Close() error {
	return c.writer.Close()
}

// 发送日志到kafka
func (c *Client) SendAuditLog(ctx context.Context, in *audit.AuditLog) error {
	b := in.ToJsonByte()
	if b == nil {
		return fmt.Errorf("日志 %+v 转换失败", in)
	}
	err := c.writer.WriteMessages(ctx,
		kafka.Message{
			Value: b,
		},
	)
	if err != nil {
		logger.L().Errorf("写入kafka失败:", err)
		return err
	}
	return nil
}
